Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit pages size to a configurable limit #14994

Merged
merged 12 commits into from
Oct 12, 2023
Merged

Conversation

cryptoe
Copy link
Contributor

@cryptoe cryptoe commented Sep 15, 2023

Adding the ability to limit the pages sizes of select queries.

  • We piggyback on the same machinery that is used to control the numRowsPerSegment.
  • This patch introduces a new context parameter rowsPerPage for which the default value is set to 100000 rows.
  • This patch also optimizes adding the last selectResults stage only when the previous stages have sorted outputs. Currently for each select query with selectDestination=durableStorage, we used to add this extra selectResults stage.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • [] a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Documentation Area - Batch Ingestion Area - Querying Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Sep 15, 2023
Copy link
Contributor

@LakshSingla LakshSingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was done with a partial review, but this question popped up in my mind: It seems like we are adding a new stage to partition by page size. I think this is not an optimal way since adding a new stage means that we'd be shuffling again.
In cases when fault tolerance is enabled, this also means an extra round of upload and download. There would be a performance penalty associated with this since this feature is intended for querying from deep storage where the result set would be huge.
If the above is true, I think we should look for an alternative than creating a new stage.

Comment on lines 218 to 230
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "output"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(6).frames(1),
0, 0, "shuffle"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing the assertion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its was more of an influx thing. Fixed it

return channelCounters;
}

public static class TestFrame extends Frame
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static class TestFrame extends Frame
private static class TestFrame extends Frame

Let's not create a separate class for a single use

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just removed this class and went with mocks.

@@ -105,7 +105,7 @@ public class Frame
private final int numRegions;
private final boolean permuted;

private Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted)
protected Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted)
Copy link
Contributor

@LakshSingla LakshSingla Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
protected Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted)
private Frame(Memory memory, FrameType frameType, long numBytes, int numRows, int numRegions, boolean permuted)

Please revert this change. We shouldn't change the access level in the main class to allow testing. At worst, we can mock the class in the tests, but if we do want to assert something that won't ever be possible and we are setting up a defensive check for the same.

We are basically annotating the constructor here with @VisibleForTesting, without mentioning it
#11848 (comment)

);
if (finalShuffleStageDef.doesSortDuringShuffle()) {
final QueryDefinitionBuilder builder = QueryDefinition.builder();
for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could use org.apache.druid.msq.kernel.QueryDefinitionBuilder#addAll(org.apache.druid.msq.kernel.QueryDefinition) here

// we add a final stage which generates one partition per worker.
shuffleSpecFactory = ShuffleSpecFactories.globalSortWithMaxPartitionCount(tuningConfig.getMaxNumWorkers());
shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(
MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something in somewhere else in this PR, but doesn't GlobalSortTargetSizeShuffleSpec enforce the limit on the total partition size summed across all workers? Since we create a new page for each worker parition combination, would the limit be enforced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GlobalSortTargetSizeShuffleSpec enforces a limit on partition size globally yes.

So there can be 2 cases:

  1. If the last stage is group by post shuffle, then we know that each partition will only be present on distinct worker only. Hence the page size will control the number of rows in that partition.

  2. If the last stage is scanStage, then we add a new QueryResultFrameProcessor since data needs to be sorted on the boost column. The queryResultFrameProcessor will merge the result in the same partition and write out a single partition. Since the partition cuts on sizes are done globally, in the controller, we would have the final partition equal to the page size configured.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a new testcase testExternSelectWithMultipleWorkers. You can look at the counter checks to see whats happening with a scan query.

Copy link
Contributor

@LakshSingla LakshSingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is also supposed to help with the case of missing results reported in community Slack.
Let us add a test case that fails with the existing code and is fixed by the patch.

As discussed offline, you mentioned that it was due to a single processor producing multiple frames, so we should add that to confirm that the regression is fixed.

@cryptoe
Copy link
Contributor Author

cryptoe commented Oct 11, 2023

@LakshSingla that test case is already added if you see SqlStatementResourceHelperTest#testDistinctPartitionsOnEachWorker() .

@LakshSingla
Copy link
Contributor

Okay I looked at SqlStatementResourceHelper#populatePageList, and we are using counters during the execution time to determine the page list. Shouldn't that be risky, considering counters are a best-effort user-facing statistics for the job and not something we can rely upon.

@cryptoe
Copy link
Contributor Author

cryptoe commented Oct 12, 2023

@LakshSingla
In theory I agree with you that counters are a little risky in the long run and we should have something else in the task report but there are no good alternatives yet. Once we go down on generating frame indexes, then we can think of changing the approach of this.

@LakshSingla
Copy link
Contributor

LakshSingla commented Oct 12, 2023

Once we go down on generating frame indexes, then we can think of changing the approach of this.

Why do we need indexes on frames for this?

@LakshSingla
Copy link
Contributor

@LakshSingla that test case is already added if you see SqlStatementResourceHelperTest#testDistinctPartitionsOnEachWorker() .

Also, if possible please add a MSQ select test for the regression. Using counters is risky as is and we'd wanna make sure that we are tackling the issue we are seeing E2E.

@cryptoe
Copy link
Contributor Author

cryptoe commented Oct 12, 2023

Why do we need indexes on frames for this?

That will help us in supporting usecases in the result api with api params like startRowOffset and range. Then users need not fetch via pages.

@cryptoe
Copy link
Contributor Author

cryptoe commented Oct 12, 2023

@LakshSingla that test case is already added if you see SqlStatementResourceHelperTest#testDistinctPartitionsOnEachWorker() .

Also, if possible please add a MSQ select test for the regression. Using counters is risky as is and we'd wanna make sure that we are tackling the issue we are seeing E2E.

Added another test SqlMSQStatementResourcePostTest#testMultipleWorkersWithPageSizeLimiting which covers that case e2e.

Copy link
Contributor

@LakshSingla LakshSingla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM post CI 🚀

@cryptoe
Copy link
Contributor Author

cryptoe commented Oct 12, 2023

Thanks @adarshsanjeev @LakshSingla for the review.

@cryptoe cryptoe merged commit 61ea9e0 into apache:master Oct 12, 2023
cryptoe added a commit to cryptoe/druid that referenced this pull request Oct 12, 2023
Adding the ability to limit the pages sizes of select queries.

    We piggyback on the same machinery that is used to control the numRowsPerSegment.
    This patch introduces a new context parameter rowsPerPage for which the default value is set to 100000 rows.
    This patch also optimizes adding the last selectResults stage only when the previous stages have sorted outputs. Currently for each select query with selectDestination=durableStorage, we used to add this extra selectResults stage.

(cherry picked from commit 61ea9e0)
@cryptoe cryptoe mentioned this pull request Oct 12, 2023
LakshSingla pushed a commit that referenced this pull request Oct 12, 2023
Adding the ability to limit the pages sizes of select queries.

    We piggyback on the same machinery that is used to control the numRowsPerSegment.
    This patch introduces a new context parameter rowsPerPage for which the default value is set to 100000 rows.
    This patch also optimizes adding the last selectResults stage only when the previous stages have sorted outputs. Currently for each select query with selectDestination=durableStorage, we used to add this extra selectResults stage.

(cherry picked from commit 61ea9e0)
ektravel pushed a commit to ektravel/druid that referenced this pull request Oct 16, 2023
Adding the ability to limit the pages sizes of select queries.

    We piggyback on the same machinery that is used to control the numRowsPerSegment.
    This patch introduces a new context parameter rowsPerPage for which the default value is set to 100000 rows.
    This patch also optimizes adding the last selectResults stage only when the previous stages have sorted outputs. Currently for each select query with selectDestination=durableStorage, we used to add this extra selectResults stage.
CaseyPan pushed a commit to CaseyPan/druid that referenced this pull request Nov 17, 2023
Adding the ability to limit the pages sizes of select queries.

    We piggyback on the same machinery that is used to control the numRowsPerSegment.
    This patch introduces a new context parameter rowsPerPage for which the default value is set to 100000 rows.
    This patch also optimizes adding the last selectResults stage only when the previous stages have sorted outputs. Currently for each select query with selectDestination=durableStorage, we used to add this extra selectResults stage.
@LakshSingla LakshSingla added this to the 29.0.0 milestone Jan 29, 2024
@marzi312
Copy link

hey @cryptoe
I've tested this feature with a curl command, POST to /v2/sql/statements but it completely ignores rowsPerPage parameter.
Example:
curl -XPOST -H "Content-Type: application/json" -d '{"context": {"includeSegmentSource":"REALTIME","selectDestination":"DURABLESTORAGE", "executionMode":"ASYNC", "durableShuffleStorage": "true", "rowsPerPage":"1000"}, "query":"SELECT * FROM my_table limit 200000"}' https://<url>/druid/v2/sql/statements

rowsPerPage default value is 100000 but it seems it even ignores the default value for me.
I tested this with Druid V29

and here's the results from get query status endpoint
...."pages":[{"id":0,"numRows":200000,"sizeInBytes":123026540}]}

@LakshSingla
Copy link
Contributor

@marzi312 It is happening because you have a LIMIT in front of your query. It is a bug in the MSQ engine. I'll raise a patch for the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - Documentation Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Querying
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants